//---------------------------------------------------------- -*- Mode: C++ -*-
// $Id: ClientSM.cc 34 2007-10-18 21:06:43Z sriramsrao $ 
//
// Created 2006/03/23
// Author: Sriram Rao (Kosmix Corp.) 
//
// Copyright 2006 Kosmix Corp.
//
// This file is part of Kosmos File System (KFS).
//
// Licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// 
//----------------------------------------------------------------------------

#include "ClientSM.h"

#include "ChunkManager.h"
#include "ChunkServer.h"
#include "Utils.h"

#include <string>
#include <sstream>
using std::string;
using std::ostringstream;

#include "common/log.h"
#include <boost/scoped_array.hpp>
using boost::scoped_array;

using namespace KFS;

ClientSM::ClientSM(NetConnectionPtr &conn) 
{
    mNetConnection = conn;
    SET_HANDLER(this, &ClientSM::HandleRequest);
}

ClientSM::~ClientSM()
{
    KfsOp *op;

    assert(mOps.empty());
    while (!mOps.empty()) {
        op = mOps.front();
        mOps.pop_front();
        delete op;
    }
    gClientManager.Remove(this);
}

///
/// Send out the response to the client request.  The response is
/// generated by MetaRequest as per the protocol.
/// @param[in] op The request for which we finished execution.
///
void
ClientSM::SendResponse(KfsOp *op)
{
    ostringstream os;
    ReadOp *rop;
    string s = op->Show();

    op->Response(os);

    KFS_LOG_VA_DEBUG("Command %s: Response status: %d\n", 
                     s.c_str(), op->status);

    mNetConnection->Write(os.str().c_str(), os.str().length());
    if (op->op == CMD_READ) {
        // need to send out the data read
        rop = static_cast<ReadOp *> (op);
        if (op->status >= 0) {
            KFS_LOG_VA_DEBUG("Bytes avail from read: %d\n",
                             rop->dataBuf->BytesConsumable());
            assert(rop->dataBuf->BytesConsumable() == rop->status);
            mNetConnection->Write(rop->dataBuf, rop->numBytesIO);
        }
    }
}

///
/// Generic event handler.  Decode the event that occurred and
/// appropriately extract out the data and deal with the event.
/// @param[in] code: The type of event that occurred
/// @param[in] data: Data being passed in relative to the event that
/// occurred.
/// @retval 0 to indicate successful event handling; -1 otherwise.
///
int
ClientSM::HandleRequest(int code, void *data)
{
    IOBuffer *iobuf;
    KfsOp *op;
    int cmdLen;

    switch (code) {
    case EVENT_NET_READ:
	// We read something from the network.  Run the RPC that
	// came in.
	iobuf = (IOBuffer *) data;
	while (IsMsgAvail(iobuf, &cmdLen)) {
	    // if we don't have all the data for the command, wait
	    if (!HandleClientCmd(iobuf, cmdLen))
		break;
	}
	break;

    case EVENT_NET_WROTE:
	// Something went out on the network.  For now, we don't
	// track it. Later, we may use it for tracking throttling
	// and such.
	break;

    case EVENT_CMD_DONE:
	// An op finished execution.  Send a response back
	op = (KfsOp *) data;
	op->done = true;
	if (op != mOps.front())
	    break;
	while (!mOps.empty()) {
	    op = mOps.front();
	    if (!op->done)
		break;
	    SendResponse(op);
	    // we are done with the op
	    mOps.pop_front();
	    delete op;
	}
	break;

    case EVENT_NET_ERROR:
	// KFS_LOG_VA_DEBUG("Closing connection");

	if (mNetConnection)
	    mNetConnection->Close();

        // wait for the ops to finish
        SET_HANDLER(this, &ClientSM::HandleTerminate);
        if (mOps.empty()) {
            delete this;
        }

	break;

    default:
	assert(!"Unknown event");
	break;
    }
    return 0;
}

///
/// Termination handler.  For the client state machine, we could have
/// ops queued at the logger.  So, for cleanup wait for all the
/// outstanding ops to finish and then delete this.  In this state,
/// the only event that gets raised is that an op finished; anything
/// else is bad.
///
int
ClientSM::HandleTerminate(int code, void *data)
{
    KfsOp *op;

    switch (code) {
    case EVENT_CMD_DONE:
	// An op finished execution.  Send a response back
	op = (KfsOp *) data;
	op->done = true;
	if (op != mOps.front())
	    break;
	while (!mOps.empty()) {
	    op = mOps.front();
	    if (!op->done)
		break;
	    // we are done with the op
	    mOps.pop_front();
	    delete op;
	}
	break;
    default:
	assert(!"Unknown event");
	break;
    }
    if (mOps.empty()) {
        // all ops are done...so, now, we can nuke ourself.
        delete this;
    }
    return 0;
}

///
/// We have a command in a buffer.  It is possible that we don't have
/// everything we need to execute it (for example, for a write we may
/// not have received all the data the client promised).  So, parse
/// out the command and if we have everything execute it.
/// 

bool
ClientSM::HandleClientCmd(IOBuffer *iobuf,
                          int cmdLen)
{
    scoped_array<char> buf;
    KfsOp *op;
    size_t nAvail;

    buf.reset(new char[cmdLen + 1]);
    iobuf->CopyOut(buf.get(), cmdLen);
    buf[cmdLen] = '\0';
    
    if (ParseCommand(buf.get(), cmdLen, &op) != 0) {
        iobuf->Consume(cmdLen);

        KFS_LOG_VA_DEBUG("Aye?: %s", buf.get());
        // got a bogus command
        return true;
    }

    if (op->op == CMD_WRITE_PREPARE) {
        WritePrepareOp *wop = static_cast<WritePrepareOp *> (op);

        assert(wop != NULL);

        // if we don't have all the data for the write, hold on...
        nAvail = iobuf->BytesConsumable() - cmdLen;        
        if (nAvail < wop->numBytes) {
            delete op;
            // we couldn't process the command...so, wait
            return false;
        }
        iobuf->Consume(cmdLen);
        wop->dataBuf = new IOBuffer();

        wop->dataBuf->Move(iobuf, wop->numBytes);
        nAvail = wop->dataBuf->BytesConsumable();
        // KFS_LOG_VA_DEBUG("Got command: %s", buf.get());

        KFS_LOG_VA_DEBUG("# of bytes avail for write: %lu", nAvail);
    } else {
        string s = op->Show();

        KFS_LOG_VA_DEBUG("Got command: %s\n", s.c_str());

        iobuf->Consume(cmdLen);
    }

    mOps.push_back(op);

    op->clnt = this;
    op->Execute();
    
    return true;
}
